# -*- coding: utf-8 -*-
"""
instrumentserver.server.core
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Core functionality of the instrument server.
"""
# TODO: add a signal for when instruments are closed?
# TODO: validator only when the parameter is settable?
# TODO: the BluePrints should probably go into the serialization module.
# TODO: for full functionality in the proxy, we probably need to introduce
# operations for adding parameters/submodules/functions
# TODO: can we also create methods remotely?
import os
import importlib
import inspect
import logging
import random
from dataclasses import dataclass, field
from enum import Enum, unique
from typing import Dict, Any, Union, Optional, Tuple, List, Callable
import zmq
import qcodes as qc
from qcodes import (
Station, Instrument, InstrumentChannel, Parameter, ParameterWithSetpoints)
from qcodes.instrument.base import InstrumentBase
from qcodes.utils.validators import Validator
from .. import QtCore, serialize
from ..base import send, recv
from ..helpers import nestedAttributeFromString, objectClassPath, typeClassPath
__author__ = 'Wolfgang Pfaff', 'Chao Zhou'
__license__ = 'MIT'
logger = logging.getLogger(__name__)
INSTRUMENT_MODULE_BASE_CLASSES = [
Instrument, InstrumentChannel, InstrumentBase
]
InstrumentModuleType = Union[Instrument, InstrumentChannel, InstrumentBase]
PARAMETER_BASE_CLASSES = [
Parameter, ParameterWithSetpoints
]
ParameterType = Union[Parameter, ParameterWithSetpoints]
[docs]@unique
class Operation(Enum):
"""Valid operations for the server."""
#: Get a list of instruments the server has instantiated.
get_existing_instruments = 'get_existing_instruments'
#: Create a new instrument.
create_instrument = 'create_instrument'
#: Get the blueprint of an object.
get_blueprint = 'get_blueprint'
#: Make a call to an object.
call = 'call'
#: Get the station contents as parameter dict.
get_param_dict = 'get_param_dict'
#: Set station parameters from a dictionary.
set_params = 'set_params'
[docs]@dataclass
class InstrumentCreationSpec:
"""Spec for creating an instrument instance."""
#: Driver class as string, in the format "global.path.to.module.DriverClass".
instrument_class: str
#: Name of the new instrument, I separate this from args and kwargs to
# make it easier to be found.
name: str = ''
#: Arguments to pass to the constructor.
args: Optional[Tuple] = None
#: kw args to pass to the constructor.
kwargs: Optional[Dict[str, Any]] = None
[docs]@dataclass
class CallSpec:
"""Spec for executing a call on an object in the station."""
#: Full name of the callable object, as string, relative to the station object.
#: E.g.: "instrument.my_callable" refers to ``station.instrument.my_callable``.
target: str
#: Positional arguments to pass.
args: Optional[Any] = None
#: kw args to pass.
kwargs: Optional[Dict[str, Any]] = None
[docs]@dataclass
class ParameterBluePrint:
"""Spec necessary for creating parameter proxies."""
name: str
path: str
base_class: str
parameter_class: str
gettable: bool = True
settable: bool = True
unit: str = ''
vals: Optional[Validator] = None
docstring: str = ''
setpoints: Optional[List[str]] = None
def __repr__(self) -> str:
return str(self)
def __str__(self) -> str:
return f"{self.name}: {self.parameter_class}"
def tostr(self, indent=0):
i = indent * ' '
ret = f"""{self.name}: {self.parameter_class}
{i}- unit: {self.unit}
{i}- path: {self.path}
{i}- base class: {self.base_class}
{i}- gettable: {self.gettable}
{i}- settable: {self.settable}
{i}- validator: {self.vals}
{i}- setpoints: {self.setpoints}
"""
return ret
def bluePrintFromParameter(path: str, param: ParameterType) -> \
Union[ParameterBluePrint, None]:
base_class = None
for bc in PARAMETER_BASE_CLASSES:
if isinstance(param, bc):
base_class = bc
break
if base_class is None:
logger.warning(f"Blueprints for parameter base type of {param} are "
f"currently not supported.")
return None
bp = ParameterBluePrint(
name=param.name,
path=path,
base_class=typeClassPath(base_class),
parameter_class=objectClassPath(param),
gettable=True if hasattr(param, 'get') else False,
settable=True if hasattr(param, 'set') else False,
unit=param.unit,
docstring=param.__doc__,
)
if hasattr(param, 'set'):
bp.vals = param.vals
if hasattr(param, 'setpoints'):
bp.setpoints = [setpoint.name for setpoint in param.setpoints]
return bp
[docs]@dataclass
class MethodBluePrint:
"""Spec necessary for creating method proxies."""
name: str
path: str
call_signature: inspect.Signature
docstring: str = ''
def __repr__(self):
return str(self)
def __str__(self):
return f"{self.name}{str(self.call_signature)}"
def tostr(self, indent=0):
i = indent * ' '
ret = f"""{self.name}{str(self.call_signature)}
{i}- path: {self.path}
"""
return ret
def bluePrintFromMethod(path: str, method: Callable) -> Union[MethodBluePrint, None]:
sig = inspect.signature(method)
bp = MethodBluePrint(
name=path.split('.')[-1],
path=path,
call_signature=sig,
docstring=method.__doc__,
)
return bp
[docs]@dataclass
class InstrumentModuleBluePrint:
"""Spec necessary for creating instrument proxies."""
name: str
path: str
base_class: str
instrument_module_class: str
docstring: str = ''
parameters: Optional[Dict[str, ParameterBluePrint]] = field(default_factory=dict)
methods: Optional[Dict[str, MethodBluePrint]] = field(default_factory=dict)
submodules: Optional[Dict[str, "InstrumentModuleBluePrint"]] = field(default_factory=dict)
def __repr__(self) -> str:
return str(self)
def __str__(self) -> str:
return f"{self.name}: {self.instrument_module_class}"
def tostr(self, indent=0):
i = indent * ' '
ret = f"""{i}{self.name}: {self.instrument_module_class}
{i}- path: {self.path}
{i}- base class: {self.base_class}
"""
ret += f"{i}- Parameters:\n{i} -----------\n"
for pn, p in self.parameters.items():
ret += f"{i} - " + p.tostr(indent + 4)
ret += f"{i}- Methods:\n{i} --------\n"
for mn, m in self.methods.items():
ret += f"{i} - " + m.tostr(indent + 4)
ret += f"{i}- Submodules:\n{i} -----------\n"
for sn, s in self.submodules.items():
ret += f"{i} - " + s.tostr(indent + 4)
return ret
def bluePrintFromInstrumentModule(path: str, ins: InstrumentModuleType) -> \
Union[InstrumentModuleBluePrint, None]:
base_class = None
for bc in INSTRUMENT_MODULE_BASE_CLASSES:
if isinstance(ins, bc):
base_class = bc
break
if base_class is None:
logger.warning(f"Blueprints for instrument base type of {ins} are "
f"currently not supported.")
return None
bp = InstrumentModuleBluePrint(
name=ins.name,
path=path,
base_class=typeClassPath(base_class),
instrument_module_class=objectClassPath(ins),
docstring=ins.__doc__
)
bp.parameters = {}
bp.methods = {}
bp.submodules = {}
for pn, p in ins.parameters.items():
param_path = f"{path}.{p.name}"
param_bp = bluePrintFromParameter(param_path, p)
if param_bp is not None:
bp.parameters[pn] = param_bp
for elt in dir(ins):
# don't include private methods, or methods that belong to the qcodes
# base classes.
if elt[0] == '_' or hasattr(base_class, elt):
continue
o = getattr(ins, elt)
if callable(o) and not isinstance(o, tuple(PARAMETER_BASE_CLASSES)):
meth_path = f"{path}.{elt}"
meth_bp = bluePrintFromMethod(meth_path, o)
if meth_bp is not None:
bp.methods[elt] = meth_bp
for sn, s in ins.submodules.items():
sub_path = f"{path}.{sn}"
sub_bp = bluePrintFromInstrumentModule(sub_path, s)
if sub_bp is not None:
bp.submodules[sn] = sub_bp
return bp
[docs]@dataclass
class ParameterBroadcastBluePrint:
"""Blueprint to broadcast parameter changes."""
name: str
action: str
value: int = None
unit: str = None
def __init__(self, name: str, action: str, value: int = None, unit: str = None):
self.name = name
self.value = value
self.unit = unit
self.action = action
def __str__(self) -> str:
ret = f"""\"name\":\"{self.name}\": {{
\"action\":\"{self.action}" """
if self.value is not None:
ret = ret + f"\n \"value\":\"{self.value}\""
if self.unit is not None:
ret = ret + f"\n \"unit\":\"{self.unit}\""
ret = ret + f"""\n}}"""
return ret
def __repr__(self):
return str(self)
def pprint(self, indent=0):
i = indent * ' '
ret = f"""name: {self.name}
{i}- action: {self.action}
{i}- value: {self.value}
{i}- unit: {self.unit}
"""
return ret
[docs]@dataclass
class ParameterSerializeSpec:
#: Path of the object to serialize. ``None`` refers to the station as a whole.
path: Optional[str] = None
#: Which attributes to include for each parameter. Default is ['values'].
attrs: List[str] = field(default_factory=lambda: ['values'])
#: Additional arguments to pass to the serialization function
#: :func:`.serialize.toParamDict`.
args: Optional[Any] = field(default_factory=list)
#: Additional kw arguments to pass to the serialization function
#: :func:`.serialize.toParamDict`.
kwargs: Optional[Dict[str, Any]] = field(default_factory=dict)
[docs]@dataclass
class ServerInstruction:
#TODO: Remove set parameterr from the code.
"""Instruction spec for the server.
Valid operations:
- :attr:`Operation.get_existing_instruments` -- get the instruments currently
instantiated in the station.
- **Required options:** -
- **Return message:** dictionary with instrument name and class (as string).
- :attr:`Operation.create_instrument` -- create a new instrument in the station.
- **Required options:** :attr:`.create_instrument_spec`
- **Return message:** ``None``
- :attr:`Operation.call` -- make a call to an object in the station.
- **Required options:** :attr:`.call_spec`
- **Return message:** The return value of the call.
- :attr:`Operation.get_blueprint` -- request the blueprint of an object
- **Required options:** :attr:`.requested_path`
- **Return message:** The blueprint of the object.
- :attr:`Operation.get_param_dict` -- request parameters as dictionary
Get the parameters of either the full station or a single object.
- **Options:** :attr:`.serialization_opts`
- **Return message:** param dict.
"""
#: This is the only mandatory item.
#: Which other fields are required depends on the operation.
operation: Operation
#: Specification for creating an instrument.
create_instrument_spec: Optional[InstrumentCreationSpec] = None
#: Specification for executing a call.
call_spec: Optional[CallSpec] = None
#: Name of the instrument for which we want the blueprint.
requested_path: Optional[str] = None
#: Options for serialization.
serialization_opts: Optional[ParameterSerializeSpec] = None
#: Setting parameters in bulk with a paramDict.
set_parameters: Optional[Dict[str, Any]] = field(default_factory=dict)
#: Generic arguments.
args: Optional[List[Any]] = field(default_factory=list)
#: Generic keyword arguments.
kwargs: Optional[Dict[str, Any]] = field(default_factory=dict)
def validate(self):
if self.operation is Operation.create_instrument:
if not isinstance(self.create_instrument_spec, InstrumentCreationSpec):
raise ValueError('Invalid instrument creation spec.')
if self.operation is Operation.call:
if not isinstance(self.call_spec, CallSpec):
raise ValueError('Invalid call spec.')
[docs]@dataclass
class ServerResponse:
"""Spec for what the server can return.
If the requested operation succeeds, `message` will the return of that operation,
and `error` is None.
See :class:`ServerInstruction` for a documentation of the expected returns.
If an error occurs, `message` is typically ``None``, and `error` contains an
error message or object describing the error.
"""
#: The return message.
message: Optional[Any] = None
#: Any error message occured during execution of the instruction.
error: Optional[Union[None, str, Warning, Exception]] = None
[docs]class StationServer(QtCore.QObject):
"""The main server object.
Encapsulated in a separate object so we can run it in a separate thread.
Port should always be an odd number to allow the next even number to be its corresponding
publishing port.
"""
# We use this to quit the server.
# If this string is sent as message to the server, it'll shut down and close
# the socket. Should only be used from within this module.
# It's randomized in the instantiated server for a little bit of safety.
SAFEWORD = 'BANANA'
#: Signal(str, str) -- emit messages for display in the gui (or other stuff the gui
#: wants to do with it.
#: Arguments: the message received, and the reply sent.
messageReceived = QtCore.Signal(str, str)
#: Signal(int) -- emitted when the server is started.
#: Arguments: the port.
serverStarted = QtCore.Signal(str)
#: Signal() -- emitted when we shut down.
finished = QtCore.Signal()
#: Signal(Dict) -- emitted when a new instrument was created.
#: Argument is the blueprint of the instrument.
instrumentCreated = QtCore.Signal(object)
#: Signal(str, Any) -- emitted when a parameter was set
#: Arguments: full parameter location as string, value.
parameterSet = QtCore.Signal(str, object)
#: Signal(str, Any) -- emitted when a parameter was retrieved
#: Arguments: full parameter location as string, value.
parameterGet = QtCore.Signal(str, object)
#: Signal(str, List[Any], Dict[str, Any], Any) -- emitted when a function was called
#: Arguments: full function location as string, arguments, kw arguments, return value.
funcCalled = QtCore.Signal(str, object, object, object)
def __init__(self,
parent: Optional[QtCore.QObject] = None,
port: int = 5555,
allowUserShutdown: bool = False,
addresses: List[str] = [],
initScript: Optional[str] = None,
) -> None:
super().__init__(parent)
if addresses is None:
addresses = []
if initScript == None:
initScript = ''
self.SAFEWORD = ''.join(random.choices([chr(i) for i in range(65, 91)], k=16))
self.serverRunning = False
self.port = int(port)
self.station = Station()
self.allowUserShutdown = allowUserShutdown
self.listenAddresses = list(set(['127.0.0.1'] + addresses))
self.initScript = initScript
self.broadcastPort = self.port + 1
self.broadcastSocket = None
self.parameterSet.connect(
lambda n, v: logger.info(f"Parameter '{n}' set to: {str(v)}")
)
self.parameterGet.connect(
lambda n, v: logger.info(f"Parameter '{n}' retrieved: {str(v)}")
)
self.funcCalled.connect(
lambda n, args, kw, ret: logger.info(f"Function called:"
f"'{n}', args: {str(args)}, "
f"kwargs: {str(kw)})'.")
)
def _runInitScript(self):
if os.path.exists(self.initScript):
path = os.path.abspath(self.initScript)
env = dict(station=self.station)
exec(open(path).read(), env)
else:
logger.warning(f"path to initscript ({self.initScript}) not found.")
[docs] @QtCore.Slot()
def startServer(self) -> None:
"""Start the server. This function does not return until the ZMQ server
has been shut down."""
logger.info(f"Starting server.")
logger.info(f"The safe word is: {self.SAFEWORD}")
context = zmq.Context()
socket = context.socket(zmq.REP)
for a in self.listenAddresses:
addr = f"tcp://{a}:{self.port}"
socket.bind(addr)
logger.info(f"Listening at {addr}")
# creating and binding publishing socket to broadcast changes
broadcastAddr = f"tcp://*:{self.broadcastPort}"
logger.info(f"Starting publishing server at {broadcastAddr}")
self.broadcastSocket = context.socket(zmq.PUB)
self.broadcastSocket.bind(broadcastAddr)
self.serverRunning = True
if self.initScript not in ['', None]:
logger.info(f"Running init script")
self._runInitScript()
self.serverStarted.emit(addr)
while self.serverRunning:
message = recv(socket)
message_ok = True
response_to_client = None
response_log = None
# Allow the test client from within the same process to make sure the
# server shuts down. This is
if message == self.SAFEWORD:
response_log = 'Server has received the safeword and will shut down.'
response_to_client = ServerResponse(message=response_log)
self.serverRunning = False
logger.warning(response_log)
elif self.allowUserShutdown and message == 'SHUTDOWN':
response_log = 'Server shutdown requested by client.'
response_to_client = ServerResponse(message=response_log)
self.serverRunning = False
logger.warning(response_log)
# If the message is a string we just echo it back.
# This is used for testing sometimes, but has no functionality.
elif isinstance(message, str):
response_log = f"Server has received: {message}. No further action."
response_to_client = ServerResponse(message=response_log)
logger.debug(response_log)
# We assume this is a valid instruction set now.
elif isinstance(message, ServerInstruction):
instruction = message
try:
instruction.validate()
logger.debug(f"Received request for operation: "
f"{str(instruction.operation)}")
logger.debug(f"Instruction received: "
f"{str(instruction)}")
except Exception as e:
message_ok = False
response_log = f'Received invalid message. Error raised: {str(e)}'
response_to_client = ServerResponse(message=None, error=e)
logger.warning(response_log)
if message_ok:
# We don't need to use a try-block here, because
# errors are already handled in executeServerInstruction.
response_to_client = self.executeServerInstruction(instruction)
response_log = f"Response to client: {str(response_to_client)}"
if response_to_client.error is None:
logger.debug(f"Response sent to client.")
logger.debug(response_log)
else:
logger.warning(response_log)
else:
response_log = f"Invalid message type."
response_to_client = ServerResponse(message=None, error=response_log)
logger.warning(f"Invalid message type: {type(message)}.")
logger.debug(f"Invalid message received: {str(message)}")
send(socket, response_to_client)
self.messageReceived.emit(str(message), response_log)
self.broadcastSocket.close()
socket.close()
self.finished.emit()
return True
[docs] def executeServerInstruction(self, instruction: ServerInstruction) \
-> Tuple[ServerResponse, str]:
"""
This is the interpreter function that the server will call to translate the
dictionary received from the proxy to instrument calls.
:param instruction: The instruction object.
:returns: The results returned from performing the operation.
"""
args = []
kwargs = {}
# We call a helper function depending on the operation that is requested.
if instruction.operation == Operation.get_existing_instruments:
func = self._getExistingInstruments
elif instruction.operation == Operation.create_instrument:
func = self._createInstrument
args = [instruction.create_instrument_spec]
elif instruction.operation == Operation.call:
func = self._callObject
args = [instruction.call_spec]
elif instruction.operation == Operation.get_blueprint:
func = self._getBluePrint
args = [instruction.requested_path]
elif instruction.operation == Operation.get_param_dict:
func = self._toParamDict
args = [instruction.serialization_opts]
elif instruction.operation == Operation.set_params:
func = self._fromParamDict
args = [instruction.set_parameters]
else:
raise NotImplementedError
try:
returns = func(*args, **kwargs)
response = ServerResponse(message=returns)
except Exception as err:
response = ServerResponse(message=None, error=err)
return response
def _getExistingInstruments(self) -> Dict:
"""
Get the existing instruments in the station.
:returns: A dictionary that contains the instrument name and its class name.
"""
comps = self.station.components
info = {k: v.__class__ for k, v in comps.items()}
return info
def _createInstrument(self, spec: InstrumentCreationSpec) -> None:
"""Create a new instrument on the server."""
sep_class = spec.instrument_class.split('.')
modName = '.'.join(sep_class[:-1])
clsName = sep_class[-1]
mod = importlib.import_module(modName)
cls = getattr(mod, clsName)
args = [] if spec.args is None else spec.args
kwargs = dict() if spec.kwargs is None else spec.kwargs
new_instrument = qc.find_or_create_instrument(
cls, name=spec.name, *args, **kwargs)
if new_instrument.name not in self.station.components:
self.station.add_component(new_instrument)
self.instrumentCreated.emit(
bluePrintFromInstrumentModule(new_instrument.name,
new_instrument)
)
def _callObject(self, spec: CallSpec) -> Any:
"""Call some callable found in the station."""
obj = nestedAttributeFromString(self.station, spec.target)
args = spec.args if spec.args is not None else []
kwargs = spec.kwargs if spec.kwargs is not None else {}
ret = obj(*args, **kwargs)
# Check if a new parameter is being created.
self._newOrDeleteParameterDetection(spec, args, kwargs)
if isinstance(obj, Parameter):
if len(args) > 0:
self.parameterSet.emit(spec.target, args[0])
# Broadcast changes in parameter values.
self._broadcastParameterChange(ParameterBroadcastBluePrint(spec.target, 'parameter-update', args[0]))
else:
self.parameterGet.emit(spec.target, ret)
# Broadcast calls of parameters.
self._broadcastParameterChange(ParameterBroadcastBluePrint(spec.target, 'parameter-call', ret))
else:
self.funcCalled.emit(spec.target, args, kwargs, ret)
return ret
def _getBluePrint(self, path: str) -> Union[InstrumentModuleBluePrint,
ParameterBluePrint,
MethodBluePrint]:
obj = nestedAttributeFromString(self.station, path)
if isinstance(obj, tuple(INSTRUMENT_MODULE_BASE_CLASSES)):
return bluePrintFromInstrumentModule(path, obj)
elif isinstance(obj, tuple(PARAMETER_BASE_CLASSES)):
return bluePrintFromParameter(path, obj)
elif callable(obj):
return bluePrintFromMethod(path, obj)
else:
raise ValueError(f'Cannot create a blueprint for {type(obj)}')
def _toParamDict(self, opts: ParameterSerializeSpec) -> Dict[str, Any]:
if opts.path is None:
obj = self.station
else:
obj = [nestedAttributeFromString(self.station, opts.path)]
includeMeta = [k for k in opts.attrs if k != 'value']
return serialize.toParamDict(obj, *opts.args, includeMeta=includeMeta,
**opts.kwargs)
def _fromParamDict(self, params: Dict[str, Any]):
return serialize.fromParamDict(params, self.station)
def _broadcastParameterChange(self, blueprint: ParameterBroadcastBluePrint):
"""
Broadcast any changes to parameters in the server.
The message is composed of a 2 part array. The first item is the name of the instrument the parameter is from,
with the second item being the string of the blueprint in dict format.
This is done to allow subscribers to subscribe to specific instruments.
:param blueprint: The parameter broadcast blueprint that is being broadcast
"""
self.broadcastSocket.send_string(blueprint.name.split('.')[0], flags=zmq.SNDMORE)
self.broadcastSocket.send_string((blueprint.toDictFormat()))
logger.info(f"Parameter {blueprint.name} has broadcast an update of type: {blueprint.action},"
f" with a value: {blueprint.value}.")
def _newOrDeleteParameterDetection(self, spec, args, kwargs):
"""
Detects if the call action is being used to create a new parameter or deletes an existing parameter.
If so, it creates the parameter broadcast blueprint and broadcast it.
:param spec: CallSpec object being passed to the call method.
:param args: args being passed to the call method.
:param kwargs: kwargs being passed to the call method.
"""
if spec.target.split('.')[-1] == 'add_parameter':
name = spec.target.split('.')[0] + '.' + '.'.join(spec.args)
pb = ParameterBroadcastBluePrint(name,
'parameter-creation',
kwargs['initial_value'],
kwargs['unit'])
self._broadcastParameterChange(pb)
elif spec.target.split('.')[-1] == 'remove_parameter':
name = spec.target.split('.')[0] + '.' + '.'.join(spec.args)
pb = ParameterBroadcastBluePrint(name,
'parameter-deletion')
self._broadcastParameterChange(pb)
[docs]def startServer(port: int = 5555,
allowUserShutdown: bool = False,
addresses: List[str] = [],
initScript: Optional[str] = None) -> \
Tuple[StationServer, QtCore.QThread]:
"""Create a server and run in a separate thread.
:returns: The server object and the thread it's running in.
"""
server = StationServer(port=port,
allowUserShutdown=allowUserShutdown,
addresses=addresses,
initScript=initScript)
thread = QtCore.QThread()
server.moveToThread(thread)
server.finished.connect(thread.quit)
thread.started.connect(server.startServer)
thread.start()
return server, thread