Skip to content
Snippets Groups Projects
Commit 5182013f authored by Daniele Nicolodi's avatar Daniele Nicolodi
Browse files

Move reusable code to ptblab package and modernize

parent 5232f774
No related branches found
No related tags found
No related merge requests found
__pycache__
import click import click
import cryod import cryod
import rpc
from ptblab import rpc
@click.group() @click.group()
......
import asyncio import asyncio
import click import click
import datetime
import eventloop
import cpa28xx import cpa28xx
import rpc
from ptblab import datalogger, eventloop, rpc, utils
ADDRESS = 'tcp://127.0.0.1:65534' ADDRESS = 'tcp://127.0.0.1:65534'
TIMEOUT = 3.0 # seconds TIMEOUT = 3.0 # seconds
async def status(compressor, filename): async def status(compressor, datafile):
fd = open(filename, 'a') if filename else None
async for t in eventloop.tick(): async for t in eventloop.tick():
r = compressor.get_status() r = compressor.get_status()
print(f'{t:.3f} {r.power} {cpa28xx.State(r.state)!s} {r.errors} {r.warnings} {r.coolant_in_temperature:.3f} {r.coolant_out_temperature:.3f}', ts = datetime.datetime.fromtimestamp(t).isoformat(' ', 'milliseconds')
end='\r', flush=True) print(f'{t:.3f} {ts} {r.power} {cpa28xx.State(r.state)!s} {r.errors} {r.warnings} {r.coolant_in_temperature:.3f} {r.coolant_out_temperature:.3f}', end='\033[0K\r')
if fd is not None: datafile.write(t, *r)
print(t, *r, file=fd, flush=True)
@click.command() @click.command()
@click.argument('filename', required=False) @click.argument('filename', required=False)
@click.option('--device', '-d', metavar='DEV', help='Communication port.') @click.option('--device', '-d', metavar='DEV', help='Communication port.')
def main(device, filename): @click.option('--datadir', type=click.Path(), help='Data folder for datalogger mode.')
@click.option('--dataname', default='cryo', metavar='STRING', help='Data name in datalogger mode.')
@utils.config_option(section='cpa28xx')
def main(device, filename, datadir, dataname):
compressor = cpa28xx.CPA28xx(device) compressor = cpa28xx.CPA28xx(device)
server = rpc.Server(ADDRESS, compressor) server = rpc.Server(ADDRESS, compressor)
datafile = datalogger.open_data_file(datadir, dataname, filename)
async def main(): async def main():
await asyncio.gather(status(compressor, filename), server()) await asyncio.gather(status(compressor, datafile), server())
asyncio.set_event_loop_policy(eventloop.EventLoopPolicy()) asyncio.set_event_loop_policy(eventloop.EventLoopPolicy())
asyncio.run(main()) asyncio.run(main())
......
import asyncio
import time
import win32time
async def tick(period=1.0):
loop = asyncio.get_running_loop()
tnext = loop.time()
while True:
yield time.time()
tnext += period
future = loop.create_future()
loop.call_at(tnext, asyncio.futures._set_result_unless_cancelled, future, None)
await future
class HiResSelectorEventLoop(asyncio.SelectorEventLoop):
def __init__(self):
super().__init__()
win32time.set_resolution(1e-3)
self._clock_resolution = win32time.get_resolution().cur
def time(self):
return time.perf_counter()
class EventLoopPolicy(asyncio.DefaultEventLoopPolicy):
_loop_factory = HiResSelectorEventLoop
import functools
import pickle
import zmq
import zmq.asyncio
class RPCError(RuntimeError):
pass
class RPCTimeoutError(TimeoutError):
pass
class Server:
"""Simple asynchronous RPC server."""
def __init__(self, address, obj):
ctx = zmq.asyncio.Context()
self.socket = ctx.socket(zmq.REP)
self.socket.bind(address)
self.obj = obj
async def __call__(self):
while True:
request, args = await self.socket.recv_multipart()
func = getattr(self.obj, request.decode('utf8'))
reply = func(*pickle.loads(args))
await self.socket.send_multipart((b'ACK', pickle.dumps(reply)))
class Client:
"""Simple synchronous RPC client."""
def __init__(self, address, timeout=1.0):
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.REQ)
self.socket.connect(address)
self.timeout = int(timeout * 1000)
def call(self, method, *args):
self.socket.send_multipart((method.encode('utf8'), pickle.dumps(args)))
if not self.socket.poll(self.timeout) & zmq.POLLIN:
raise RPCTimeoutError
reply, s = self.socket.recv_multipart()
values = pickle.loads(s)
if reply != b'ACK':
raise RPCError(s)
return values
def __getattr__(self, name):
return functools.partial(self.call, name)
import rpc
def main():
client = rpc.Client('tcp://127.0.0.1:1234')
n = 0
for _ in range(10):
r = client.call('inc', n)
assert r == n + 1
n = r
print(n)
for _ in range(10):
r = client.inc(n)
assert r == n + 1
n = r
print(n)
if __name__ == '__main__':
main()
import asyncio
import eventloop
import rpc
class Test:
def inc(self, n):
return n + 1
def main():
server = rpc.Server('tcp://127.0.0.1:1234', Test())
asyncio.set_event_loop_policy(eventloop.EventLoopPolicy())
asyncio.run(server())
if __name__ == '__main__':
main()
import ctypes
from collections import namedtuple
from contextlib import contextmanager
ntdll = ctypes.WinDLL('NTDLL.DLL')
resolution = namedtuple('resolution', 'min max cur')
NSEC_PER_SEC = 1000000000
def get_resolution_ns():
"""Query resolution of system timer.
See `NtQueryTimerResolution`
http://undocumented.ntinternals.net/index.html?page=UserMode%2FUndocumented%20Functions%2FTime%2FNtQueryTimerResolution.html
http://www.windowstimestamp.com/description
https://bugs.python.org/issue13845
"""
minimum = ctypes.c_ulong()
maximum = ctypes.c_ulong()
current = ctypes.c_ulong()
r = ntdll.NtQueryTimerResolution(ctypes.byref(minimum), ctypes.byref(maximum), ctypes.byref(current))
# NtQueryTimerResolution returned values are in 100ns units
return resolution(minimum.value * 100, maximum.value * 100, current.value * 100)
def get_resolution():
r = get_resolution_ns()
return resolution(r.min / NSEC_PER_SEC, r.max / NSEC_PER_SEC, r.cur / NSEC_PER_SEC)
def set_resolution_ns(resolution):
"""Set resolution of system timer.
See `NtSetTimerResolution`
http://undocumented.ntinternals.net/index.html?page=UserMode%2FUndocumented%20Functions%2FTime%2FNtSetTimerResolution.html
http://www.windowstimestamp.com/description
https://bugs.python.org/issue13845
"""
# NtSetTimerResolution uses 100ns units
resolution = ctypes.c_ulong(int(resolution // 100))
current = ctypes.c_ulong()
r = ntdll.NtSetTimerResolution(resolution, 1, ctypes.byref(current))
# NtSetTimerResolution uses 100ns units
return current.value * 100
def set_resolution(resolution):
return set_resolution_ns(resolution * NSEC_PER_SEC) / NSEC_PER_SEC
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment