Skip to content
Snippets Groups Projects
Commit 30e7f335 authored by Daniele Nicolodi's avatar Daniele Nicolodi
Browse files

Import

parents
No related branches found
No related tags found
No related merge requests found
import click
import collections
import enum
import struct
import time
import modbuslib
import serial
# See "CPAxxxx Digital Panel User Manual" page 21.
class State(enum.IntEnum):
"""Operating state, register 1."""
IDLING = 0
STARTING = 2
RUNNING = 3
STOPPING = 5
LOCKOUT = 6
ERROR = 7
HELIUM_COOL_DOWN = 8
POWER_RELATED_ERROR = 9
RECOVERED_FROM_ERROR = 15
def __str__(self):
return self._name_
class Compressor(enum.IntEnum):
"""Compressor state, register 2."""
OFF = 0
ON = 1
class BitField(enum.IntFlag):
@classmethod
def fromint(cls, value):
if not value & (1 << 32):
return cls.UNKNOWN
value ^= (1 << 32)
return cls(value)
@classmethod
def frombytes(cls, data):
return cls.fromint(struck.unpack('i', data))
def __str__(self):
return self._name_
class Warning(BitField):
"""Warning state, register 3."""
NONE = 0 # No warnings
COOLANT_IN_HIGH = 1 << 0 # Coolant IN running High
COOLANT_IN_LOW = 1 << 1 # Coolant IN running Low
COOLANT_OUT_HIGH = 1 << 2 # Coolant OUT running High
COOLANT_OUT_LOW = 1 << 3 # Coolant OUT running Low
OIL_HIGH = 1 << 4 # Oil running High
OIL_LOW = 1 << 5 # Oil running Low
HELIUM_HIGH = 1 << 6 # Helium running High
HELIUM_LOW = 1 << 7 # Helium running Low
LOW_PRESSURE_HIGH = 1 << 8 # Low Pressure running High
LOW_PRESSURE_LOW = 1 << 9 # Low Pressure running Low
HIGH_PRESSURE_HIGH = 1 << 10 # High Pressure running High
HIGH_PRESSURE_LOW = 1 << 11 # High Pressure running Low
DELTA_PRESSURE_HIGH = 1 << 12 # Delta Pressure running High
DELTA_PRESSURE_LOW = 1 << 13 # Delta Pressure running Low
STATIC_PRESSURE_HIGH = 1 << 17 # Static Pressure running High
STATIC_PRESSURE_LOW = 1 << 18 # Static Pressure running Low
COLD_HEAD_STALL = 1 << 19 # Cold head motor Stall
UNKNOWN = 1 << 32
class Error(BitField):
"""Error state, register 5."""
NONE = 0 # No Errors
COOLANT_IN_HIGH = 1 << 0 # Coolant IN High
COOLANT_IN_LOW = 1 << 1 # Coolant IN Low
COOLANT_OUT_HIGH = 1 << 2 # Coolant OUT High
COOLANT_OUT_LOW = 1 << 3 # Coolant OUT Low
OIL_HIGH = 1 << 4 # Oil High
OIL_LOW = 1 << 5 # Oil Low
HELIUM_HIGH = 1 << 6 # Helium High
HELIUM_LOW = 1 << 7 # Helium Low
LOW_PRESSURE_HIGH = 1 << 8 # Low Pressure High
LOW_PRESSURE_LOW = 1 << 9 # Low Pressure Low
HIGH_PRESSURE_HIGH = 1 << 10 # High Pressure High
HIGH_PRESSURE_LOW = 1 << 11 # High Pressure Low
DELTA_PRESSURE_HIGH = 1 << 12 # Delta Pressure High
DELTA_PRESSURE_LOW = 1 << 13 # Delta Pressure Low
MOTOR_CURRENT_LOW = 1 << 14 # Motor Current Low
THREE_PHASE_ERROR = 1 << 15 # Three Phase Error
POWER_SUPPLY_ERROR = 1 << 16 # Power Supply Error
STATIC_PRESSURE_HIGH = 1 << 17 # Static Pressure High
STATIC_PRESSURE_LOW = 1 << 18 # Static Pressure Low
UNKNOWN = 1 << 32
class PressureUnits(enum.IntEnum):
"""Pressure units, register 29."""
PSI = 0
BAR = 1
KPA = 2
class TemperatureUnits(enum.IntEnum):
"""Temperature units, register 30."""
FAHRENHEIT = 0
CELSIUS = 1
KELVIN = 2
class Registers:
def __init__(self, registers):
self.identity = lambda x: x
self.converters = []
self.labels = []
self.size = 0
self.frmt = '!'
for frmt, conv, label in registers:
self.frmt += frmt
self.size += struct.calcsize(frmt)
self.converters.append(conv if conv is not None else self.identity)
self.labels.append(label)
self.struct = struct.Struct(self.frmt)
def unpack(self, data):
values = self.struct.unpack(data)
return tuple(conv(v) for v, conv in zip(values, self.converters))
float32 = modbuslib.bytes_to_float32_cdab
regmap = Registers([
( 'h', None, 'State'), # 1
( 'h', None, 'Power'), # 2
( 'i', None, 'Warnings'), # 3-4
( 'i', None, 'Errors'), # 5-6
('4s', float32, 'Coolant In Temperature'), # 7-8
('4s', float32, 'Coolant Out Temperature'), # 9-10
('4s', float32, 'Oil Temperature'), # 11-12
('4s', float32, 'Helium Temperatue'), # 13-14
('4s', float32, 'Low Pressure'), # 15-16
('4s', float32, 'Low Pressure Average'), # 17-18
('4s', float32, 'High Pressure'), # 19-20
('4s', float32, 'High Pressure Average'), # 21-22
('4s', float32, 'Delta Pressure Average'), # 23-24
('4s', float32, 'Motor Current'), # 25-26
('4s', float32, 'Hours of Operation'), # 27-28
])
Status = collections.namedtuple('Status', [x.lower().replace(' ', '_') for x in regmap.labels])
def model(data):
hi, lo = struct.unpack('!BB', data)
major = {1:'8', 2:'9', 3:'10', 4:'11', 5:'28'}
minor = {1:'A1', 2:'01', 3:'02', 4:'03', 5:'H3', 6:'I3', 7:'04', 8:'H4', 9:'05',
10:'H5', 11:'I6', 12:'06', 13:'07', 14:'H7', 15:'I7', 16:'08', 17:'09',
18:'9C', 19:'10', 20:'1I', 21:'11', 22:'12', 23:'13', 24:'14'}
return major[hi] + minor[lo]
configuration = Registers([
( 'h', PressureUnits, 'Pressure Units'), # 29
( 'h', TemperatureUnits, 'Temperature Units'), # 30
( 'h', None, 'Serial Number'), # 31
('2s', model, 'Model'), # 32
( 'h', None, 'Software Revision'), # 33
])
Configuration = collections.namedtuple('Configuration', [x.lower().replace(' ', '_') for x in configuration.labels])
class CPA28xx(modbuslib.Device):
def __init__(self, device, address=16):
bus = modbuslib.Modbus(serial.Serial(device, baudrate=115200, timeout=0.1), debug=False)
super().__init__(bus, address)
def power(self, on):
"""Turn compressor ON or OFF."""
self.write_register(1, 0x0001 if on else 0x00FF)
def get_status(self):
data = self.read_input_registers(1, 28)
values = regmap.unpack(data)
return Status._make(values)
def get_configuration(self):
data = self.read_input_registers(29, 5)
values = configuration.unpack(data)
return Configuration._make(values)
@click.group(invoke_without_command=True)
@click.pass_context
@click.option('--device', '-d', default='COM4', metavar='DEV', help='Communication port.')
def main(ctx, device):
ctx.obj = CPA28xx(device)
if ctx.invoked_subcommand is None:
ctx.invoke(status)
@main.command()
@click.pass_obj
def status(compressor):
config = compressor.get_configuration()
print(f' Model: {config.model}')
print(f' Serial Number: {config.serial_number}')
print(f' Software Revision: {config.software_revision}')
print(f' Pressure Units: {config.pressure_units.name}')
print(f' Temperature Units: {config.temperature_units.name}')
status = compressor.get_status()
print(f' State: {State(status.state).name}')
print(f' Compressor: {Compressor(status.power).name}')
print(f' Warnings: {Warning.fromint(status.warnings)}')
print(f' Errors: {Error.fromint(status.errors)}')
for name, value in zip(regmap.labels[4:], status[4:]):
print(f'{name:>24s}: {value:7.3f}')
@main.command()
@click.argument('x', type=click.Choice(('on', 'off')))
@click.pass_obj
def power(compressor, x):
compressor.power(x == 'on')
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment