Source code for instrumentserver.client.proxy

# -*- coding: utf-8 -*-
"""
Created on Sat Apr 18 16:13:40 2020

@author: Chao
"""
import inspect
import json
import logging
import os
from types import MethodType
from typing import Any, Union, Optional, Dict, List

import qcodes as qc
import zmq
from qcodes import Instrument, Parameter
from qcodes.instrument.base import InstrumentBase

from instrumentserver import QtCore, DEFAULT_PORT
from instrumentserver.server.core import (
    ServerInstruction,
    InstrumentModuleBluePrint,
    ParameterBluePrint,
    MethodBluePrint,
    CallSpec,
    Operation,
    InstrumentCreationSpec,
    ParameterSerializeSpec,
)
from .core import sendRequest, BaseClient

logger = logging.getLogger(__name__)


# TODO: enable creation of instruments through yaml files/station configurator.
# TODO: support for channel lists
# TODO: support for other parameter classes.
# FIXME: need to generally find the imports we need for type annotations!
# TODO: convenience function to refresh from server.


class ProxyMixin:
    """ A simple mixin class for proxy objects."""

    def __init__(self, *args,
                 cli: Optional["Client"] = None,
                 host: Optional[str] = 'localhost',
                 port: Optional[int] = DEFAULT_PORT,
                 remotePath: Optional[str] = None,
                 bluePrint: Optional[Union[ParameterBluePrint,
                                           InstrumentModuleBluePrint,
                                           MethodBluePrint]] = None,
                 **kwargs):

        self.cli = cli
        self.host = host
        self.port = port

        if remotePath is not None and bluePrint is None:
            self.remotePath = remotePath
            self.bp = self._getBluePrintFromServer(self.remotePath)
        elif bluePrint is not None:
            self.bp = bluePrint
            self.remotePath = self.bp.path
        else:
            raise ValueError("Either `remotePath` or `bluePrint` must be "
                             "specified.")

        kwargs.update(self.initKwargsFromBluePrint(self.bp))
        super().__init__(*args, **kwargs)
        self.__doc__ = self.bp.docstring

    def initKwargsFromBluePrint(self, bp):
        raise NotImplementedError

    def askServer(self, message: ServerInstruction):
        if self.cli is not None:
            return self.cli.ask(message)
        elif self.host is not None and self.port is not None:
            return sendRequest(message, self.host, self.port)

    def _getBluePrintFromServer(self, path):
        req = ServerInstruction(
            operation=Operation.get_blueprint,
            requested_path=path
        )
        return self.askServer(req)

    def snapshot(self, *args, **kwargs):
        req = ServerInstruction(
            operation=Operation.call,
            call_spec=CallSpec(
                target=self.remotePath + '.snapshot', args=args, kwargs=kwargs
            )
        )
        return self.askServer(req)


class ProxyParameter(ProxyMixin, Parameter):
    """Proxy for parameters.

    :param cli: Instance of `Client`.
    :param name: The parameter name.
    :param host: The name of the host where the server lives.
    :param port: The port number of the server.
    :param remotePath: Path of the remote object on the server.
    :param bluePrint: The blue print to construct the proxy parameter.
        If `remotePath` and `bluePrint` are both supplied, the blue print takes
        priority.
    """

    def __init__(self, name: str, *args,
                 cli: Optional["Client"] = None,
                 host: Optional[str] = 'localhost',
                 port: Optional[int] = DEFAULT_PORT,
                 remotePath: Optional[str] = None,
                 bluePrint: Optional[ParameterBluePrint] = None,
                 setpoints_instrument: Optional[Instrument] = None,
                 **kwargs):

        super().__init__(name, *args, cli=cli, host=host, port=port,
                         remotePath=remotePath, bluePrint=bluePrint,
                         **kwargs)

        # add setpoints to parameter if we deal with ParameterWithSetpoints
        if self.bp.setpoints is not None and setpoints_instrument is not None:
            setpoints = [getattr(setpoints_instrument, setpoint) for
                         setpoint in self.bp.setpoints]
            setattr(self, 'setpoints', setpoints)

    def initKwargsFromBluePrint(self, bp):
        kwargs = {}
        if bp.settable:
            kwargs['set_cmd'] = self._remoteSet
        else:
            kwargs['set_cmd'] = False
        if bp.gettable:
            kwargs['get_cmd'] = self._remoteGet
        else:
            kwargs['get_cmd'] = False
        kwargs['unit'] = bp.unit
        kwargs['vals'] = bp.vals
        kwargs['docstring'] = bp.docstring
        return kwargs

    def _remoteSet(self, value: Any):
        msg = ServerInstruction(
            operation=Operation.call,
            call_spec=CallSpec(
                target=self.remotePath,
                args=(value,)
            )
        )
        return self.askServer(msg)

    def _remoteGet(self):
        msg = ServerInstruction(
            operation=Operation.call,
            call_spec=CallSpec(
                target=self.remotePath,
            )
        )
        return self.askServer(msg)


class ProxyInstrumentModule(ProxyMixin, InstrumentBase):
    """Construct a proxy module using the given blue print. Each proxy
    instantiation represents a virtual module (instrument of submodule of
    instrument).

    :param bluePrint: The blue print that the describes the module.
    :param host: The name of the host where the server lives.
    :param port: The port number of the server.
    """

    def __init__(self, name: str, *args,
                 cli: Optional["Client"] = None,
                 host: Optional[str] = 'localhost',
                 port: Optional[int] = DEFAULT_PORT,
                 remotePath: Optional[str] = None,
                 bluePrint: Optional[InstrumentModuleBluePrint] = None,
                 **kwargs):

        super().__init__(name, *args, cli=cli, host=host, port=port,
                         remotePath=remotePath, bluePrint=bluePrint, **kwargs)

        for mn in self.bp.methods.keys():
            if mn == 'remove_parameter':
                def remove_parameter(obj, name: str):
                    obj.cli.call(f'{obj.remotePath}.remove_parameter', name)
                    obj.update()

                self.remove_parameter = MethodType(remove_parameter, self)

        self.parameters.pop('IDN', None)  # we will redefine this later
        self.update()

    def initKwargsFromBluePrint(self, bp):
        return {}

    def update(self):
        self.bp = self.cli.getBluePrint(self.remotePath)
        self._getProxyParameters()
        self._getProxyMethods()
        self._getProxySubmodules()

    def add_parameter(self, name: str, *arg, **kw):
        """Add a parameter to the proxy instrument.

        If a parameter of that name already exists in the server-side instrument,
        we only add the proxy parameter.
        If not, we first add the parameter to the server-side instrument, and
        then the proxy here.
        """

        if name in self.parameters:
            raise ValueError(f'Parameter: {name} already present in the proxy.')

        bp: InstrumentModuleBluePrint
        bp = self.cli.getBluePrint(self.name)
        self.cli.call(self.name + ".add_parameter", name, *arg, **kw)
        self.update()

    def _getProxyParameters(self) -> None:
        """Based on the parameter blueprint replied from server, add the
        instrument parameters to the proxy instrument class."""

        # note: we can always provide setpoints_instruments, because in case
        # the parameter doesn't, `setpoints` will just be `None`.
        for pn, p in self.bp.parameters.items():
            if pn not in self.parameters:
                pbp = self.cli.getBluePrint(f"{self.remotePath}.{pn}")
                super().add_parameter(pbp.name, ProxyParameter, cli=self.cli, host=self.host,
                                      port=self.port, bluePrint=pbp, setpoints_instrument=self)

        delKeys = []
        for pn in self.parameters.keys():
            if pn not in self.bp.parameters:
                delKeys.append(pn)

        # Changing the argument for del self.parameters[pn] to del self.parameters[k]
        for k in delKeys:
            del self.parameters[k]

    def _getProxyMethods(self):
        """Based on the method blue print replied from server, add the
        instrument functions to the proxy instrument class.
        """
        for n, m in self.bp.methods.items():
            if not hasattr(self, n):
                fun = self._makeProxyMethod(m)
                setattr(self, n, MethodType(fun, self))
                self.functions[n] = getattr(self, n)

    def _makeProxyMethod(self, bp: MethodBluePrint):
        def wrap(*a, **k):
            msg = ServerInstruction(
                operation=Operation.call,
                call_spec=CallSpec(target=bp.path, args=a, kwargs=k)
            )
            return self.askServer(msg)

        sig = bp.call_signature
        args = []
        for pn in sig.parameters:
            if sig.parameters[pn].kind in [inspect.Parameter.POSITIONAL_OR_KEYWORD,
                                           inspect.Parameter.POSITIONAL_ONLY]:
                args.append(f'{pn}')
            elif sig.parameters[pn].kind is inspect.Parameter.VAR_POSITIONAL:
                args.append(f"*{pn}")
            elif sig.parameters[pn].kind is inspect.Parameter.KEYWORD_ONLY:
                args.append(f"{pn}={pn}")
            elif sig.parameters[pn].kind is inspect.Parameter.VAR_KEYWORD:
                args.append(f"**{pn}")

        # we need to add a `self` argument because we want this to be a bound
        # method of the instrument instance.
        sig_str = str(sig)
        sig_str = sig_str[0] + 'self, ' + sig_str[1:]
        new_func_str = f"""from typing import *\ndef {bp.name}{sig_str}:
        return wrap({', '.join(args)})"""

        # make sure the method knows the wrap function.
        # TODO: this is not complete!
        globs = {'wrap': wrap, 'qcodes': qc}
        exec(new_func_str, globs)
        fun = globs[bp.name]
        fun.__doc__ = bp.docstring
        return globs[bp.name]

    def _getProxySubmodules(self):
        """Based on the submodule blue print replied from server, add the proxy
        submodules to the proxy module class.
        """
        for sn, s in self.bp.submodules.items():
            if sn not in self.submodules:
                submodule = ProxyInstrumentModule(
                    s.name, cli=self.cli, host=self.host, port=self.port, bluePrint=s)
                self.add_submodule(sn, submodule)
            else:
                self.submodules[sn].update()

        delKeys = []
        for sn, s in self.submodules.items():
            if sn not in self.bp.submodules:
                delKeys.append(sn)
        for k in delKeys:
            del self.submodules[sn]

    def _refreshProxySubmodules(self):
        delKeys = []
        for sn, s in self.submodules.items():
            if sn in self.bp.submodules:
                delKeys.append(sn)
        for k in delKeys:
            del self.submodules[sn]

        for sn, s in self.bp.submodules.items():
            if sn not in self.submodules:
                submodule = ProxyInstrumentModule(
                    s.name, cli=self.cli, host=self.host, port=self.port, bluePrint=s)
                self.add_submodule(sn, submodule)
            else:
                self.submodules[sn].update()

    def __getattr__(self, item):
        try:
            return super().__getattr__(item)
        except Exception as e:
            current_bp = self.cli.getBluePrint(self.remotePath)
            if item in current_bp.parameters and item not in self.parameters:
                self.bp = current_bp
                self._getProxyParameters()
                return getattr(self, item)
            elif item in current_bp.submodules and item not in self.submodules:
                self.bp = current_bp
                self._getProxySubmodules()
                return getattr(self, item)
            else:
                raise e


ProxyInstrument = ProxyInstrumentModule


[docs]class Client(BaseClient): """Client with common server requests as convenience functions."""
[docs] def list_instruments(self) -> Dict[str, str]: """ Get the existing instruments on the server. """ msg = ServerInstruction(operation=Operation.get_existing_instruments) return self.ask(msg)
[docs] def find_or_create_instrument(self, instrument_class: str, name: str, *args: Any, **kwargs: Any) -> ProxyInstrumentModule: """ Create a new instrument on the server and return a proxy for the new instrument. :param instrument_class: Class of the instrument to create or a string of of the class. :param name: Name of the new instrument. :param args: Positional arguments for new instrument instantiation. :param kwargs: Keyword arguments for new instrument instantiation. :returns: A new virtual instrument. """ req = ServerInstruction( operation=Operation.create_instrument, create_instrument_spec=InstrumentCreationSpec( instrument_class=instrument_class, name=name, args=args, kwargs=kwargs ) ) _ = self.ask(req) return ProxyInstrumentModule(name=name, cli=self, remotePath=name)
def close_instrument(self, instrument_name: str): self.call('close_and_remove_instrument', instrument_name) def call(self, target, *args, **kwargs): msg = ServerInstruction( operation=Operation.call, call_spec=CallSpec( target=target, args=args, kwargs=kwargs, ) ) return self.ask(msg) def get_instrument(self, name): return ProxyInstrumentModule(name=name, cli=self, remotePath=name) def getBluePrint(self, path): msg = ServerInstruction( operation=Operation.get_blueprint, requested_path=path, ) return self.ask(msg) def snapshot(self, instrument: str = None, *args, **kwargs): msg = ServerInstruction( operation=Operation.call, call_spec=CallSpec( target='snapshot' if instrument is None else f"{instrument}.snapshot", args=args, kwargs=kwargs, ) ) return self.ask(msg) def getParamDict(self, instrument: str = None, attrs: List[str] = ['value'], *args, **kwargs): msg = ServerInstruction( operation=Operation.get_param_dict, serialization_opts=ParameterSerializeSpec( path=instrument, attrs=attrs, args=args, kwargs=kwargs, ) ) return self.ask(msg) def paramsToFile(self, filePath: str, *args, **kwargs): filePath = os.path.abspath(filePath) folder, file = os.path.split(filePath) params = self.getParamDict(*args, **kwargs) if not os.path.exists(folder): os.makedirs(folder) with open(filePath, 'w') as f: json.dump(params, f, indent=2, sort_keys=True) def setParameters(self, parameters: Dict[str, Any]): msg = ServerInstruction( operation=Operation.set_params, set_parameters=parameters, ) return self.ask(msg) def paramsFromFile(self, filePath: str): params = None if os.path.exists(filePath): with open(filePath, 'r') as f: params = json.load(f) self.setParameters(params) else: logger.warning(f"File {filePath} does not exist. No params loaded.")
[docs]class SubClient(QtCore.QObject): """ Specific subscription client used for real-time parameter updates. """ #: Signal(str) -- #: emitted when the server broadcast either a new parameter or an update to an existing one. update = QtCore.Signal(str) def __init__(self, instruments: List[str] = None, sub_host: str = 'localhost', sub_port: int = DEFAULT_PORT + 1): """ Creates a new subscription client. :param instruments: List of instruments the subclient will listen for. If empty it will listen to all broadcasts done by the server. :param host: The host location of the updates. :param port: Should not be changed. It always is the server normal port +1. """ super().__init__() self.host = sub_host self.port = sub_port self.addr = f"tcp://{self.host}:{self.port}" self.instruments = instruments self.connected = False
[docs] def connect(self): """ Connects the subscription client with the broadcast and runs an infinite loop to check for updates. It should always be run on a separate thread or the program will get stuck in the loop. """ logger.info(f"Connecting to {self.addr}") context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(self.addr) # subscribe to the specified instruments if self.instruments is None: socket.setsockopt_string(zmq.SUBSCRIBE, '') else: for ins in self.instruments: socket.setsockopt_string(zmq.SUBSCRIBE, ins) self.connected = True while self.connected: message = socket.recv_multipart() # emits the signals already decoded so python recognizes it a string instead of bytes self.update.emit(message[1].decode("utf-8")) self.disconnect() return True
class _QtAdapter(QtCore.QObject): def __init__(self, parent, *arg, **kw): super().__init__(parent) class QtClient(_QtAdapter, Client): def __init__(self, parent=None, host='localhost', port=DEFAULT_PORT, connect=True): super().__init__(parent, host, port, connect)