"""Tools which are shared among multiple Oxford Instruments devices
This provides a class which can be overridden by the various instruments from
Oxford instruments. The `OxfordCommon` class provides the initialization and
communication routines common to the ITC503, the PS120, and the IPS120.
"""
import logging
from time import sleep, time
from src.core.instrument import visa
from src.instruments.pyvisa import vpp43_constants as vpc
from src.instruments.pyvisa.vpp43 import get_attribute, flush, set_buffer
from src.tools import config_parser as cp
from src.tools.general import frange
PROTOCOLS = ['GPIB', 'Serial', 'Gateway Master', 'Gateway Slave', 'ISOBUS']
SERIAL_ALLOWED = {'baud_rate': ['300', '600', '2400', '4800', '9600', '19200'],
'parity': ['None', 'Odd', 'Even', 'Mark', 'Space'],
'data_bits': ['7', '8'],
'stop_bits': ['1.0', '1.5', '2.0']}
SERIAL_DEFAULTS = {'baud_rate': '9600',
'parity': 'None',
'data_bits': '8',
'stop_bits': '1.0'}
_NONSENSE_ERROR = '%s returned nonsense (%s) on command %s.'
_SERIAL_SIZE = 4096
_SERIAL_SIZE_MASK = vpc.VI_ASRL_IN_BUF + vpc.VI_ASRL_OUT_BUF
_SERIAL_FLUSH_MASK = vpc.VI_ASRL_IN_BUF_DISCARD + vpc.VI_ASRL_OUT_BUF_DISCARD
_SERIAL_CONVERT_BAUD_RATE = {'300': 300,
'600': 600,
'2400': 2400,
'4800': 4800,
'9600': 9600,
'19200': 19200}
_SERIAL_CONVERT_PARITY = {'None': vpc.VI_ASRL_PAR_NONE,
'Odd': vpc.VI_ASRL_PAR_ODD,
'Even': vpc.VI_ASRL_PAR_EVEN,
'Mark': vpc.VI_ASRL_PAR_MARK,
'Space': vpc.VI_ASRL_PAR_SPACE}
_SERIAL_CONVERT_DATA_BITS = {'7': 7, '8': 8}
_SERIAL_CONVERT_STOP_BITS = {'1.0': vpc.VI_ASRL_STOP_ONE,
'1.5': vpc.VI_ASRL_STOP_ONE5,
'2.0': vpc.VI_ASRL_STOP_TWO}
_SERIAL_CONVERT_FLOW = {'None': vpc.VI_ASRL_FLOW_NONE,
'XON/XOFF': vpc.VI_ASRL_FLOW_XON_XOFF,
'RTS/CTS': vpc.VI_ASRL_FLOW_RTS_CTS,
'XON/XOFF & RTS/CTS': (vpc.VI_ASRL_FLOW_XON_XOFF +
vpc.VI_ASRL_FLOW_RTS_CTS),
'DTR/DSR': vpc.VI_ASRL_FLOW_DTR_DSR,
'XON/XOFF & DTR/DSR': (vpc.VI_ASRL_FLOW_XON_XOFF +
vpc.VI_ASRL_FLOW_DTR_DSR)}
def _convertSerialDictionary(serial):
"""Convert serial information from human terms to instrument terms."""
return {'baud_rate': _SERIAL_CONVERT_BAUD_RATE[serial['baud_rate']],
'parity': _SERIAL_CONVERT_PARITY[serial['parity']],
'data_bits': _SERIAL_CONVERT_DATA_BITS[serial['data_bits']],
'stop_bits': _SERIAL_CONVERT_STOP_BITS[serial['stop_bits']]}
[docs]class OxfordCommon(object):
"""This is a class to perform actions common to most Oxford Instruments
devices, including the ITC503, the PS120, and the IPS120.
Parameters
----------
name : str
A name to identify the instrument
protocol : {'ISOBUS', 'GPIB', 'Serial', 'Gateway Master', 'Gateway Slave'}
The protocol for communication between the computer and the power
supply.
isobusAddress : str
An integer string representing the ISOBUS address, if relevant. An
integer will be accepted and converted.
visaAddress : str
A full VISA resource address (including the bus) to locate the
instrument (e.g. "GPIB0::27").
serialConfig : dict
A dictionary to indicate how to configure a serial port, which is used
with both the 'ISOBUS' and 'Serial' protocols.
Methods
-------
openCommunication()
Open a new (protocol-specific) communication channel between the
computer and the instrument, initializing initializing the ports
and sending device clears as appropriate.
closeCommunication()
Close the communication channel between the computer and the
instrument, freeing reserved resources.
communicate(command)
Send a command (str) to the instrument and read its response.
"""
def __init__(self, name='Magnet', protocol='ISOBUS', isobusAddress='0',
visaAddress='GPIB0::23', serialConfig=None):
"""Initialize a new power supply object."""
self._name = name
self._inst = None
self._vi = None
self._protocol = protocol.lower().replace(' ', '')
if self._protocol == 'gpib':
self.communicate = self._communicateGPIB
elif self._protocol == 'serial':
self.communicate = self._communicateSerial
elif self._protocol == 'gatewaymaster' or protocol == 'master':
self.communicate = self._communicateGateway
elif self._protocol == 'gatewayslave' or protocol == 'slave':
self.communicate = self._communicateGateway
else:
self.communicate = self._communicateISOBUS
if isinstance(isobusAddress, int):
self._isobus = '%d' % isobusAddress
else:
self._isobus = isobusAddress
self._visa = visaAddress
if serialConfig is not None:
self._serial = _convertSerialDictionary(serialConfig)
#===========================================================================
# Initialization and finalization
#===========================================================================
[docs] def openCommunication(self):
"""Initialize the instrument.
Open a new (protocol-specific) communication channel between the
computer and the instrument, initializing initializing the ports
and sending device clears as appropriate.
Parameters
----------
protocol : str
A string to indicate the communication protocol for the
instrument. It may be one of the following: 'GPIB', 'Gateway
Master', 'Gateway Slave', or 'ISOBUS'. The default is 'ISOBUS'.
"""
if self._protocol == 'gpib':
self._openCommunicationGPIB()
elif self._protocol == 'serial':
self._openCommunicationISOBUS()
elif self._protocol == 'gatewaymaster':
self._openCommunicationGPIB()
elif self._protocol == 'gatewayslave':
self._openCommunicationGatewaySlave()
else:
self._openCommunicationISOBUS()
def _openCommunicationISOBUS(self):
"""Initialize the instrument for RS232 or ISOBUS communication."""
self._inst = visa.SerialInstrument(self._visa, **self._serial)
self._vi = self._inst.vi
sleep(0.1)
self._serialFlushBuffer()
set_buffer(self._vi, _SERIAL_SIZE_MASK, _SERIAL_SIZE)
def _openCommunicationGPIB(self):
"""Initialize the instrument for GPIB communication."""
self._inst = visa.instrument(self._visa, term_chars='\r')
self._inst.clear()
def _openCommunicationGatewaySlave(self):
"""Initialize the power supply as a gateway slave, i.e., do nothing."""
pass
[docs] def closeCommunication(self):
"""Finalize the instrument."""
if self._inst is not None:
self._inst.close()
#===========================================================================
# Communication
#===========================================================================
def _communicateSerial(self, command):
"""Communicate over a serial port."""
self._serialFlushBuffer()
return self._communicateGeneral(command)
def _communicateGPIB(self, command):
"""Communicate over a GPIB port."""
return self._communicateGeneral(command)
def _communicateGateway(self, command):
"""Communicate with a gateway system."""
return self._communicateGeneral(self._formatForIsobus(command))
def _communicateISOBUS(self, command):
"""Communicate over ISOBUS."""
self._serialFlushBuffer()
return self._communicateGeneral(self._formatForIsobus(command))
def _communicateGeneral(self, command):
"""Send a command to the power supply, and read the response.
Parameters
----------
command : str
A string representing a valid command for an OI instrument. All
necessary modifications (e.g. adding an ISOBUS address) should
have already been made.
Returns
-------
str
The instrument's response with `command` stripped.
"""
sleep(0.1)
commandLength = len(command)
if command.startswith('Q') or command.startswith('$'):
self._inst.write(command)
return ''
response = self._inst.ask(command + '\r')
if command[:commandLength] != response[:commandLength]:
message = _NONSENSE_ERROR % (self._name, response, command)
logging.critical(message)
raise Exception(message)
return response[commandLength:].strip()
def _formatForIsobus(self, command):
"""Modify a command to include the ISOBUS address.
Parameters
----------
command : str
The basic command, without an ISOBUS address.
Returns
-------
str
The command with the ISOBUS added, ensuring that any $s remain at
the beginning
"""
if command.startswith('$'):
return '$@' + self._isobus + command[1:]
return '@' + self._isobus + command
#===========================================================================
# Serial port tools
#===========================================================================
def _serialQueryBytesAtPort(self):
"""Query the data at the serial port.
Returns
-------
int
An integer indicating the number of bytes at the serial port for
this instrument.
"""
return int(get_attribute(self._vi, vpc.VI_ATTR_ASRL_AVAIL_NUM))
def _serialFlushBuffer(self):
"""Flush the serial buffer if there are bytes at the port."""
if self._serialQueryBytesAtPort() > 0:
sleep(0.05)
flush(self._vi, _SERIAL_FLUSH_MASK)
sleep(0.1)
#===============================================================================
# General functions
#===============================================================================
[docs]def waitForStableTemperature(targetTemperature, measurementFunction,
allowedDeviation, deviationType='percent',
stabilizedTime=60.0, timeout=600.0):
"""Wait for the temperature to stabilize.
Wait for the temperature to become steady (within some specified tolerance)
at a particular target temperature. There are two timers in use in this
method. One is the total time elapsed since the function starts, and the
other is the time over which the temperature has been within the tolerance
(i.e., it resets whenever the temperature goes out of range). The function
ends when either the latter timer reaches `stabilizedTime` or when the
former reaches `timeout`, whichever comes first.
Parameters
----------
targetTemperature : float
The temperature at which to stabilize.
measurementFunction : function
The function or method to use to measure the temperature.
allowedDeviation : float
By how much the temperature can vary and still be considered to be
at the target temperature.
deviationType : str {'percent', 'absolute'}
Whether `allowedDeviation` should be treated as a percentage of the
target temperature or an absolute temperature in Kelvin. For example,
assume that the target temperature is 10.0 K, and `allowedDeviation`
is 5. If `deviationType` is 'percent', then the temperature will
be accepted as at the target if it is between 9.5 K and 10.5 K,
inclusive. If `deviationType` is 'absolute', the accepted range is
5.0 K to 15.0 K, inclusive.
stabilizedTime : float
The minimum time, in seconds, over which the temperature must remain
within the accepted range.
timeout : float
Loosely speaking, the maximum time to wait (see `timeoutReset`).
Returns
-------
bool
`True` if the temperature has been within the allowed range long enough
to be considered stable, or `False` if the function times out.
"""
if deviationType == 'percent':
validMin = (1.0 - allowedDeviation/100.0)*targetTemperature
validMax = (1.0 + allowedDeviation/100.0)*targetTemperature
else:
validMin = targetTemperature - allowedDeviation
validMax = targetTemperature + allowedDeviation
startTime = time()
atTemp = False
currTemp = measurementFunction()
timeAtTemp = 0.0
stabilized = False
currTime = startTime
while currTime - startTime < timeout:
currTime = time()
currTemp = measurementFunction()
print 'Total running time: %.3f' % (currTime - startTime)
print 'Current temp: ' + str(currTemp)
if validMin <= currTemp <= validMax:
if not atTemp:
startTimeAtTemp = currTime
atTemp = True
timeAtTemp = currTime - startTimeAtTemp
print 'Time at temp: %.3f' % timeAtTemp
if timeAtTemp >= stabilizedTime:
print 'At temp long enough. Stopping.'
stabilized = True
break
else:
atTemp = False
print 'Not close enough. Restarting timer.'
return stabilized
[docs]def expandRange(initial, final, stepArray):
"""Expand a range, getting the step sizes from an array.
Parameters
----------
initial : float
The value at which the returned array should start.
final : float
The final value in the returned array.
stepArray : list of 3-tuple of float
A list of tuples specifying step sizes. Each tuple should consist
of three elements. The last is the step size, the first is the lowest
value for which the step size should be used, and the second is
the highest value for which the step size should be used.
Returns
-------
list of float
A list of floats spanning from `initial` to `final` (inclusive), where
the step sizes vary based on the value.
"""
if initial > final:
goingDown = True
temporary = initial
initial = final
final = temporary
else:
goingDown = False
ans = []
going = False
for item in stepArray:
start, stop, step = item
if start <= final < stop:
going = False
ans.extend(frange(start, final, step))
elif start <= initial < stop:
going = True
ans.extend(frange(initial, stop, step))
elif going:
ans.extend(frange(start, stop, step))
if going:
ans.extend(frange(ans[-1], final, stepArray[-1][2]))
ans.append(final)
if goingDown:
ans.reverse()
return ans
[docs]def readAddressConfig(configurationFile, section):
"""Read the configuration for an Oxford instrument.
Parameters
----------
configurationFile : str
The absolute path to the configuration file containing the information
about the instrument.
section : str
The section within the configuration file containing the information
about the relevant instrument.
Returns
-------
protocol : str
The communication protocol, which may be 'ISOBUS', 'GPIB',
'Serial', 'Gateway Master', and 'Gateway Slave'.
visaAddress : str
The VISA resource address for the power supply.
isobusAddress : str
The ISOBUS address of the instrument
serialConfig : dict
Either `None`, if no serial port is needed, or a dictionary
containing these keys: 'baud_rate', 'parity', 'data_bits', and
'stop bits'.
"""
defaults = {(section, 'protocol'): 'ISOBUS',
(section, 'gpib_address'): 'GPIB0::24',
(section, 'isobus_address'): '0',
(section, 'serial_baud_rate'): '9600',
(section, 'serial_parity'): 'None',
(section, 'serial_data_bits'): '8',
(section, 'serial_stop_bits'): '1.0'}
conf = cp.ConfigParser(configurationFile, cp.FORMAT_BASIC,
defaultValues=defaults)
protocol = conf.get(section, 'protocol').lower()
visaAddress = conf.get(section, 'gpib_address')
isobusAddress = conf.get(section, 'isobus_address')
serialConfig = None
if protocol == 'isobus' or protocol == 'serial':
serialConfig = {'baud_rate': conf.get(section, 'serial_baud_rate'),
'parity': conf.get(section, 'serial_parity'),
'data_bits': conf.get(section, 'serial_data_bits'),
'stop_bits': conf.get(section, 'serial_stop_bits')}
answer = {'protocol': protocol,
'visaAddress': visaAddress,
'isobusAddress': isobusAddress,
'serialConfig': serialConfig}
return answer
# Serial port flow control does not appear to be implemented. Here are the
# allowed values, just in case they're needed in the future.
# 'flow_control': ['None', 'XON/XOFF', 'RTS/CTS',
# 'XON/XOFF & RTS/CTS', 'DTR/DSR',
# 'XON/XOFF & DTR/DSR']
# default = 'None'